// Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "range_arena.h"

#include <algorithm>
#include <cstdlib>

#include "db/dbformat.h"
#include "port/port.h"
#include "pure_mem/pmemrep.h"
#include "pure_mem/version_node.h"
#include "rocksdb/env.h"
#include "util/logging.h"
#include "util/sync_point.h"

namespace rocksdb {

#ifndef _MSC_VER
#endif

RangeArena::RangeArena(size_t block_size, size_t list_size_, const Slice& startkey,
                       const Slice& endkey, AllocTracker* tracker)
    : current_mem_block_max_size_(block_size + list_size_),
      kSortedReservedKVSize_(block_size),
      sorted_re_kv_offset_(0){
  current_block_kv_size_ = 0;

  sorted_resevered_kv_ = new char[kSortedReservedKVSize_];
  assert(sorted_resevered_kv_ != nullptr);

  const size_t kStartSize = startkey.size();
  char* start = new char[kStartSize];
  memcpy(start, startkey.data(), startkey.size());
  current_block_key_start_ = Slice(start, startkey.size());
  const size_t kEndSize = endkey.size();
  char* end = new char[kEndSize];
  memcpy(end, endkey.data(), endkey.size());
  current_block_key_end_ = Slice(end, endkey.size());
}

RangeArena::~RangeArena() {
  void* node = nullptr;
  while(!kv_tmp_list.empty()){
      kv_tmp_list.sweepHead(node);
      auto cur = (VersionNode*)node;
      delete[] cur->Key();
  }

  delete[] current_block_key_start_.data();
  delete[] current_block_key_end_.data();
}

void RangeArena::AddData2OKList(uint32_t length, const char* handle) {
  // std::unique_lock<std::mutex> locker(add_data_mutex_);
  if (handle < sorted_resevered_kv_ || handle >= sorted_resevered_kv_ + kSortedReservedKVSize_){
      // 不在正常内存空间中分配的空间
      allocate_ok_times.fetch_add(1);
      return;
  }
  AddKVEvent cur(length, handle);
  add_kv_OK_list_.insertHead(cur);
}

bool RangeArena::Allocate(size_t bytes, void* node, char** buf) {
  // std::unique_lock<std::mutex> locker(cur_mutex_);
  bool raIsFull = true;


  while(true){
      *buf = nullptr;
      if (IsNotAllowedInsert()) {
        return raIsFull;
      }

      if (!IsNormal()){ // The data is temporarily stored in a temporary space
        *buf = this->allocAtTmpBlock(bytes, node);
        return raIsFull;
      }

      size_t cur_offset, next_offset;
      cur_offset = sorted_re_kv_offset_.load();
      next_offset = cur_offset + bytes;
      if (next_offset < kSortedReservedKVSize_) {  // When the reserved MEM space is
          *buf = &sorted_resevered_kv_[cur_offset];  // Stored in the reserved mem
          bool ok = sorted_re_kv_offset_.compare_exchange_strong(cur_offset, next_offset);
          if (!ok){
              continue;
          }
          current_block_kv_size_.fetch_add(bytes);
          return !raIsFull;
      }
      else{
          *buf = this->allocAtTmpBlock(bytes, node);
          return raIsFull;
      }
  }
}

Status RangeArena::GetNeedFlushData(const char* &entry,
                                    size_t &kv_flush_last_size) {
  AddKVEvent cur;
  entry = nullptr;
  if (!add_kv_OK_list_.empty()) {
    if(add_kv_OK_list_.sweepHead(cur)){
      entry = cur.handle;
      ++kv_flush_last_size;
    }
  }
  return Status::OK();
}

bool RangeArena::IsNeedFlush(size_t kv_flush_last_size) {
  return add_kv_OK_list_.size() > kv_flush_last_size;
}

bool RangeArena::IsNeedDelete() const {
  return current_block_state_.load() == MEMORY_BLOCK_IS_DELETING;
}

bool RangeArena::IsHandling() const {
  return current_block_state_.load() != MEMORY_BLOCK_OK &&
         current_block_state_.load() != MEMORY_BLOCK_IS_DELETING;
}
}  // namespace rocksdb